package com.paic.common.middleware.util

import java.util.Properties

import com.paic.common.util.ReadPropertyUtils
import org.apache.spark.sql.{DataFrame, SaveMode}

object MySQLPropertyUtils {
  def getMySqlProperties(): Properties = {
    val prop = new Properties()
    val userName = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.jdbc.username")
    val passWord = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.jdbc.password")
    prop.put("user", userName)
    prop.put("password", passWord)
    return prop
  }

  def saveDataFrameToDB(tableNameIndex: String, resultDateFrame: DataFrame) = {
    val tableName = ReadPropertyUtils.getFileProperties("mysql-user.properties", tableNameIndex)
    val jdbcURL = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.jdbc.url")
    val resultwriter = resultDateFrame.write.mode("append")
    resultwriter.jdbc(jdbcURL, tableName, getMySqlProperties())
  }
  
  def saveDataFrameToDBOverwrite(tableNameIndex: String, resultDateFrame: DataFrame) = {
    val tableName = ReadPropertyUtils.getFileProperties("mysql-user.properties", tableNameIndex)
    val jdbcURL = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.jdbc.url")
    val resultwriter = resultDateFrame.write.mode(SaveMode.Overwrite)
    resultwriter.jdbc(jdbcURL, tableName, getMySqlProperties())
  }

  def getInsertSql(tableName : String, colNumbers : Int) : String = {
    var sqlStr = "insert into " + tableName + " values("
    for (i <- 1 to colNumbers) {
      sqlStr += "?"
      if (i != colNumbers) {
        sqlStr += ", "
      }
    } 
    sqlStr += ")"
    sqlStr
  }
  
  /**
   * 通过c3p0的连接池方法，向mysql写入数据。
   * 注意：暂只支持mysql中数据字段全为string的表；存储引擎为myisam时，写入效率较低
   */
  def saveDataFrameToDBByPool(tableName : String, resultDateFrame: DataFrame) {
    val colNumbers = resultDateFrame.columns.length
    val sql = getInsertSql(tableName, colNumbers)
    resultDateFrame.foreachPartition(partitionRecords => {
      //从连接池中获取一个连接
    	val conn = MysqlManager.getMysqlManager.getConnection
      val preparedStatement = conn.prepareStatement(sql)
      try {
        conn.setAutoCommit(false)
        partitionRecords.foreach(record => {
          //注意:setString方法从1开始，record.getString()方法从0开始
          for (i <- 1 to colNumbers) {
            preparedStatement.setString(i, record.getString(i - 1))
          }
          preparedStatement.addBatch()
        })
        preparedStatement.executeBatch()
        conn.commit()
      } catch {
        case e: Exception => println(e.getMessage)
        // do some log
      } finally {
        preparedStatement.close()
        conn.close()
      }
    })
  }
  
  def getMySqlPoolConfigProperties(): Properties = {
    val prop = new Properties()
    val userName = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.pool.jdbc.userName")
    val passWord = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.pool.jdbc.passWord")
    val jdbcURL = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.pool.jdbc.url")
    val driverClass = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.pool.jdbc.driverClass")
    val minPoolSize = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.pool.jdbc.minPoolSize")
    val maxPoolSize = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.pool.jdbc.maxPoolSize")
    val acquireIncrement = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.pool.jdbc.acquireIncrement")
    val maxStatements = ReadPropertyUtils.getFileProperties("mysql-user.properties", "mysql.pool.jdbc.maxStatements")
    prop.put("userName", userName)
    prop.put("password", passWord)
    prop.put("jdbcURL", jdbcURL)
    prop.put("driverClass", driverClass)
    prop.put("minPoolSize", minPoolSize)
    prop.put("maxPoolSize", maxPoolSize)
    prop.put("acquireIncrement", acquireIncrement)
    prop.put("maxStatements", maxStatements)
    return prop
  }
}